Программирование сетевых приложений

Организация потоков, параллельной обработки, синхронизации и распределенной обработки синхронизуемых участков кода

Программирование сетевых приложений

Содержание лекции

  • Общее представление о потоках
  • Создание, остановка и соединение потоков
  • Планирование потоков и управление ими
  • Синхронизация потоков
  • Синхронизированные методы и блоки операторов
  • Тупики и методы их предотвращения
  • Коммуникация между потоками
  • Примеры на C++ и Qt
Организация потоков и синхронизация
Программирование сетевых приложений

Введение в многопоточность

Многопоточность - это механизм, позволяющий программе выполнять несколько операций одновременно. В современных приложениях потоки используются для повышения производительности, отзывчивости интерфейса и эффективного использования многоядерных процессоров.

Организация потоков и синхронизация
Программирование сетевых приложений

Основные понятия

  • Поток (thread) - наименьшая единица обработки, которую может запланировать операционная система
  • Процесс (process) - экземпляр выполняющейся программы с собственным адресным пространством
  • Параллелизм - одновременное выполнение нескольких вычислений
  • Синхронизация - координация выполнения потоков для предотвращения конфликтов
Организация потоков и синхронизация
Программирование сетевых приложений

Создание потоков в C++

#include <iostream>
#include <thread>
#include <chrono>

void workerFunction(int id) {
    std::cout << "Поток " << id << " начал работу" << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Поток " << id << " завершил работу" << std::endl;
}

int main() {
    std::cout << "Главный поток: создание потоков" << std::endl;
    
    // Создание потоков
    std::thread thread1(workerFunction, 1);
    std::thread thread2(workerFunction, 2);
    
    // Ожидание завершения потоков
    thread1.join();
    thread2.join();
    
    std::cout << "Главный поток: все потоки завершены" << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Потоки в Qt

#include <QThread>
#include <QDebug>
#include <QCoreApplication>

class Worker : public QObject {
    Q_OBJECT
    
public slots:
    void doWork() {
        qDebug() << "Рабочий поток:" << QThread::currentThreadId();
        emit workFinished();
    }
    
signals:
    void workFinished();
};

class Controller : public QObject {
    Q_OBJECT
    
public:
    Controller() {
        Worker* worker = new Worker;
        QThread* workerThread = new QThread(this);
        
        // Перемещаем worker в новый поток
        worker->moveToThread(workerThread);
        
        // Связываем сигналы и слоты
        connect(workerThread, &QThread::started, worker, &Worker::doWork);
        connect(worker, &Worker::workFinished, workerThread, &QThread::quit);
        connect(worker, &Worker::workFinished, worker, &Worker::deleteLater);
        connect(workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
        
        workerThread->start();
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Наследование от QThread

#include <QThread>
#include <QDebug>

class CustomThread : public QThread {
    Q_OBJECT
    
protected:
    void run() override {
        qDebug() << "Поток начал выполнение:" << currentThreadId();
        
        for (int i = 0; i < 5; ++i) {
            qDebug() << "Итерация" << i << "в потоке" << currentThreadId();
            msleep(1000); // Пауза 1 секунда
        }
        
        qDebug() << "Поток завершил выполнение:" << currentThreadId();
    }
};

int main(int argc, char *argv[]) {
    QCoreApplication app(argc, argv);
    
    CustomThread thread;
    thread.start();
    thread.wait(); // Ожидание завершения потока
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Синхронизация потоков с использованием мьютексов

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx;
int sharedCounter = 0;

void incrementCounter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        mtx.lock();
        ++sharedCounter;
        mtx.unlock();
    }
}

int main() {
    const int numThreads = 4;
    const int iterationsPerThread = 100000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementCounter, iterationsPerThread);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Итоговое значение счетчика: " << sharedCounter << std::endl;
    std::cout << "Ожидаемое значение: " << numThreads * iterationsPerThread << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Блокировка с использованием std::lock_guard

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx;
int sharedCounter = 0;

void incrementCounter(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        ++sharedCounter;
        // Мьютекс автоматически освобождается при выходе из области видимости
    }
}

int main() {
    const int numThreads = 4;
    const int iterationsPerThread = 100000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementCounter, iterationsPerThread);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Итоговое значение счетчика: " << sharedCounter << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Синхронизация в Qt с использованием QMutex

#include <QCoreApplication>
#include <QThread>
#include <QMutex>
#include <QDebug>
#include <QVector>

class Counter : public QObject {
    Q_OBJECT
    
private:
    int value;
    QMutex mutex;
    
public:
    Counter() : value(0) {}
    
public slots:
    void increment() {
        mutex.lock();
        int temp = value;
        QThread::msleep(1); // Имитация работы
        value = temp + 1;
        mutex.unlock();
    }
    
    int getValue() const {
        mutex.lock();
        int result = value;
        mutex.unlock();
        return result;
    }
};

class Worker : public QObject {
    Q_OBJECT
    
private:
    Counter* counter;
    
public:
    Worker(Counter* c) : counter(c) {}
    
public slots:
    void doWork() {
        for (int i = 0; i < 1000; ++i) {
            counter->increment();
        }
        emit workFinished();
    }
    
signals:
    void workFinished();
};
Организация потоков и синхронизация
Программирование сетевых приложений

Атомарные операции

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

std::atomic<int> atomicCounter(0);

void incrementAtomic(int iterations) {
    for (int i = 0; i < iterations; ++i) {
        atomicCounter.fetch_add(1, std::memory_order_relaxed);
    }
}

int main() {
    const int numThreads = 4;
    const int iterationsPerThread = 100000;
    
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(incrementAtomic, iterationsPerThread);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Атомарный счетчик: " << atomicCounter.load() << std::endl;
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Условные переменные

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> dataQueue;
bool finished = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        std::unique_lock<std::mutex> lock(mtx);
        dataQueue.push(i);
        std::cout << "Производитель: добавлен элемент " << i << std::endl;
        lock.unlock();
        
        cv.notify_one();
    }
    
    std::unique_lock<std::mutex> lock(mtx);
    finished = true;
    lock.unlock();
    cv.notify_all();
}

void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        
        cv.wait(lock, [] { return !dataQueue.empty() || finished; });
        
        if (!dataQueue.empty()) {
            int data = dataQueue.front();
            dataQueue.pop();
            lock.unlock();
            
            std::cout << "Потребитель " << id << ": получен элемент " << data << std::endl;
        } else if (finished) {
            break;
        }
    }
}
Организация потоков и синхронизация
Программирование сетевых приложений

Тупики (Deadlocks), несколько потоков ожидают друг друга:

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void threadFunction1() {
    std::lock_guard<std::mutex> lock1(mutex1);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::lock_guard<std::mutex> lock2(mutex2);
    std::cout << "Поток 1: получены оба мьютекса" << std::endl;
}

void threadFunction2() {
    std::lock_guard<std::mutex> lock2(mutex2);
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::lock_guard<std::mutex> lock1(mutex1);
    std::cout << "Поток 2: получены оба мьютекса" << std::endl;
}

int main() {
    std::thread t1(threadFunction1);
    std::thread t2(threadFunction2);
    
    t1.join();
    t2.join();
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Предотвращение тупиков с std::lock

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex1;
std::mutex mutex2;

void threadFunction1() {
    std::unique_lock<std::mutex> lock1(mutex1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mutex2, std::defer_lock);
    
    std::lock(lock1, lock2); // Безопасная блокировка
    
    std::cout << "Поток 1: получены оба мьютекса" << std::endl;
}

void threadFunction2() {
    std::unique_lock<std::mutex> lock1(mutex1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mutex2, std::defer_lock);
    
    std::lock(lock1, lock2); // Безопасная блокировка
    
    std::cout << "Поток 2: получены оба мьютекса" << std::endl;
}

int main() {
    std::thread t1(threadFunction1);
    std::thread t2(threadFunction2);
    
    t1.join();
    t2.join();
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Коммуникация между потоками в Qt (сигналы и слоты)

#include <QCoreApplication>
#include <QThread>
#include <QDebug>
#include <QTimer>

class Worker : public QObject {
    Q_OBJECT
    
private:
    int counter;
    
public:
    Worker() : counter(0) {}
    
public slots:
    void process() {
        ++counter;
        emit progress(counter);
        
        if (counter >= 10) {
            emit finished();
        } else {
            QTimer::singleShot(1000, this, &Worker::process);
        }
    }
    
signals:
    void progress(int value);
    void finished();
};

class Controller : public QObject {
    Q_OBJECT
    
private:
    Worker* worker;
    QThread* workerThread;
    
public:
    Controller() {
        worker = new Worker;
        workerThread = new QThread(this);
        
        worker->moveToThread(workerThread);
        
        connect(workerThread, &QThread::started, worker, &Worker::process);
        connect(worker, &Worker::progress, this, &Controller::onProgress);
        connect(worker, &Worker::finished, this, &Controller::onFinished);
        connect(worker, &Worker::finished, workerThread, &QThread::quit);
        connect(workerThread, &QThread::finished, worker, &Worker::deleteLater);
        connect(workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
        
        workerThread->start();
    }
    
public slots:
    void onProgress(int value) {
        qDebug() << "Прогресс:" << value;
    }
    
    void onFinished() {
        qDebug() << "Работа завершена";
        QCoreApplication::quit();
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Потокобезопасные очереди

#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable cv;
    
public:
    void push(const T& item) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            queue.push(item);
        }
        cv.notify_one();
    }
    
    bool pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [this] { return !queue.empty(); });
        
        if (queue.empty()) {
            return false;
        }
        
        item = queue.front();
        queue.pop();
        return true;
    }
    
    bool try_pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        
        if (queue.empty()) {
            return false;
        }
        
        item = queue.front();
        queue.pop();
        return true;
    }
    
    size_t size() const {
        std::unique_lock<std::mutex> lock(mtx);
        return queue.size();
    }
};
Организация потоков и синхронизация
Программирование сетевых приложений

Планирование потоков (приоритеты в Qt)

#include <QThread>
#include <QDebug>

class PriorityWorker : public QThread {
    Q_OBJECT
    
protected:
    void run() override {
        qDebug() << "Поток запущен с приоритетом:" << priority();
        
        for (int i = 0; i < 5; ++i) {
            qDebug() << "Работа в потоке с приоритетом" << priority();
            msleep(1000);
        }
    }
};

int main(int argc, char *argv[]) {
    QCoreApplication app(argc, argv);
    
    PriorityWorker lowPriorityWorker;
    PriorityWorker normalPriorityWorker;
    PriorityWorker highPriorityWorker;
    
    lowPriorityWorker.setPriority(QThread::LowestPriority);
    normalPriorityWorker.setPriority(QThread::NormalPriority);
    highPriorityWorker.setPriority(QThread::HighestPriority);
    
    lowPriorityWorker.start();
    normalPriorityWorker.start();
    highPriorityWorker.start();
    
    lowPriorityWorker.wait();
    normalPriorityWorker.wait();
    highPriorityWorker.wait();
    
    return 0;
}
Организация потоков и синхронизация
Программирование сетевых приложений

Заключение

Многопоточность - это мощный инструмент, который требует тщательного проектирования и понимания. Ключевые принципы:

  1. Минимизация общих ресурсов - чем меньше данных разделяется между потоками, тем проще обеспечить синхронизацию
  2. Использование высокоуровневых абстракций - Qt предоставляет мощные механизмы для работы с потоками
  3. Предотвращение тупиков - всегда блокировать мьютексы в одинаковом порядке
  4. Правильное завершение потоков - использовать механизмы сигналов/слотов для безопасного завершения
  5. Тестирование - многопоточные приложения требуют тщательного тестирования на наличие гонок и тупиков

Понимание этих концепций позволяет создавать эффективные и надежные многопоточные приложения на C++ с использованием Qt.

Организация потоков и синхронизация
Программирование сетевых приложений

Вопросы для самопроверки

  1. Какие существуют способы создания потоков в C++?
  2. Чем отличается std::thread от QThread?
  3. Какие механизмы синхронизации существуют в C++?
  4. Что такое тупик и как его предотвратить?
  5. Как организовать коммуникацию между потоками в Qt?
  6. Когда следует использовать атомарные операции вместо мьютексов?
  7. Какие преимущества дает использование потокобезопасных контейнеров?
Организация потоков и синхронизация